package cn.itcast.hello;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Author itcast
 * Desc 演示使用Java语言开发SparkStreaming完成WordCount
 */
public class JavaSparkDemo02 {
    public static void main(String[] args) throws InterruptedException {
        //0.TODO 准备环境
        SparkConf sparkConf = new SparkConf().setAppName("JavaSparkDemo").setMaster("local[*]");
        //JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        //jsc.setLogLevel("WARN");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        jssc.sparkContext().setLogLevel("WARN");

        //1.TODO 加载数据
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("node1", 9999);

        //2.TODO 处理数据-WordCount
        JavaPairDStream<String, Integer> result = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((a, b) -> a + b);

        //3.TODO 输出结果
        result.print();

        //4.TODO 启动并等待停止
        jssc.start();
        jssc.awaitTermination();

        //4.TODO 关闭资源
        jssc.stop();
    }
}
